package ez

import (
	"context"
	"fmt"
	"gitee.com/dreamwood/ez-go/tools"
	"github.com/nsqio/go-nsq"
	"io"
	"log"
	"net/http"
	"net/url"
)

type NsqConfig struct {
	SubAddr string `yaml:"sub-addr"`
	PubAddr string `yaml:"pub-addr"`
}

type MyNsqLogger struct {
}

func (n *MyNsqLogger) Output(calldepth int, s string) error {
	//lookupd (http://127.10.0.1:4161/lookup?topic=ez.topic.offline) - got response 404 Not Found
	matches := tools.MatchAll(s, `http://(.+?)/lookup\?topic=(.+?)\) - got response 404 Not Found`)
	if len(matches) > 0 && len(matches[0]) > 2 {
		addr := matches[0][1]
		topic := matches[0][2]

		//以form表单的形式提交
		formData := url.Values{}
		formData.Add("topic", topic)
		resp, err := http.Post(fmt.Sprintf("http://%s/topic/create?topic=%s", addr, topic), "application/x-www-form-urlencoded", nil)
		if err != nil {
			LogToConsole(err.Error())
		} else {
			content, _ := io.ReadAll(resp.Body)
			LogToConsoleNoTrace("/topic/create", string(content))
		}

	}
	return nil
}

func LoadNsqConfig() {
	if ConfigNsq == nil {
		ConfigNsq = new(NsqConfig)
		tools.CreateConfigFromYml("./app.yaml", "nsq", &ConfigNsq)
	}
}
func InitNsq() {
	if NsqProducer == nil {
		//创建一个生产者
		conf := nsq.NewConfig()
		producer, err := nsq.NewProducer(ConfigNsq.PubAddr, conf)
		//producer.SetLoggerLevel(nsq.LogLevelError)
		logger := new(MyNsqLogger)
		producer.SetLogger(logger, nsq.LogLevelError)
		if err != nil {
			log.Fatal(err)
		}
		NsqProducer = producer
		//创建两个消费者
		// 一个全局公开消息的消费者
		publicChannel := fmt.Sprintf("%s_%s", ConfigService.AppId, ConfigService.MachineCode)
		go CreateNsqHandler(EzPublicTopic, publicChannel, func(content []byte) error {
			DispatchToMany(EventPublicKey, content, context.TODO())
			return nil
		})
		// 一个全局私有消息的消费者
		privateChannel := fmt.Sprintf("%s_%s", EzPublicChannel, ConfigService.MachineCode)
		go CreateNsqHandler(ConfigService.AppId, privateChannel, func(content []byte) error {
			DispatchToMany(EventPrivateKey, content, context.TODO())
			return nil
		})
	}
}

type nsqMessageHandler struct {
	Handler func(content []byte) error
}

// HandleMessage implements the Handler interface.
func (h *nsqMessageHandler) HandleMessage(m *nsq.Message) error {
	if len(m.Body) == 0 {
		// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
		// In this case, a message with an empty body is simply ignored/discarded.
		return nil
	}

	// do whatever actual message processing is desired
	err := h.Handler(m.Body)

	// Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
	return err
}

func CreateNsqHandler(topic string, channel string, handler func(content []byte) error) {
	// Instantiate a consumer that will subscribe to the provided channel.
	conf := nsq.NewConfig()
	consumer, err := nsq.NewConsumer(topic, channel, conf)
	logger := new(MyNsqLogger)
	consumer.SetLogger(logger, nsq.LogLevelError)
	if err != nil {
		log.Fatal(err)
	}

	// Set the Handler for messages received by this Consumer. Can be called multiple times.
	// See also AddConcurrentHandlers.
	consumer.AddHandler(&nsqMessageHandler{
		Handler: handler,
	})

	// Use nsqlookupd to discover nsqd instances.
	// See also ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
	err = consumer.ConnectToNSQLookupd(ConfigNsq.SubAddr)
	if err != nil {
		log.Fatal(err)
	}

	//// wait for signal to exit
	//sigChan := make(chan os.Signal, 1)
	//signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
	//<-sigChan
	//
	//// Gracefully stop the consumer.
	//consumer.Stop()
}
